package com.ruyuan.eshop.common.concurrent;

import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author zhonghuashishan
 */
public class SafeThreadPool {

    private final Semaphore semaphore;

    private final ThreadPoolExecutor threadPoolExecutor;

    public SafeThreadPool(String name, int permits) {
        // 如果超过了100个任务同时要运行，会通过semaphore信号量阻塞
        semaphore = new Semaphore(permits);

        // 为什么要这么做，corePoolSize是0？
        // 消息推送这块，并不是一直要推送的，促销活动、发优惠券，正常情况下是不会推送
        // 发送消息的线程池，corePoolSize是0，空闲把线程都回收掉就挺好的
        threadPoolExecutor = new ThreadPoolExecutor(
                0,
                permits * 2,
                60,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                NamedDaemonThreadFactory.getInstance(name)
        );
    }

    public void execute(Runnable task) {
        // 超过了100个batch要并发推送，就会在这里阻塞住
        // 在比如说100个线程都在繁忙的时候，就不可能说有再超过100个batch要同时提交过来
        // 极端情况下，最多也就是100个batch可以拿到信号量，100 * 2的max容量
        semaphore.acquireUninterruptibly();

        threadPoolExecutor.submit(() -> {
            try {
                task.run();
            } finally {
                semaphore.release();
            }
        });
    }
}